{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "22b842be-6a82-4127-b937-ead4103a92e8",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Intitializing Scala interpreter ..."
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Spark Web UI available at http://120fa5ac9a2d:4041\n",
       "SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1734316680820)\n",
       "SparkSession available as 'spark'\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@442b0b59\n"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2295da8f-3623-43ba-af46-6fe19ec9ffca",
   "metadata": {},
   "source": [
    "# If something is nullabe, you need to wrap the value type in Option[] - this helps enforce assumptions about the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "73b5384f-be28-49e3-9bcf-4b9783ba7d91",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "import org.apache.spark.sql.SparkSession\n",
       "sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@442b0b59\n",
       "defined class Event\n"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.spark.sql.SparkSession \n",
    "\n",
    "val sparkSession = SparkSession.builder.appName(\"Juptyer\").getOrCreate()\n",
    "\n",
    "//TODO Illustrate how this fails if you change from Option[String] to String for referrer\n",
    "case class Event (\n",
    "   //Option is a way to handle NULL more gracefully\n",
    "    user_id: Option[Integer],\n",
    "    device_id: Option[Integer],\n",
    "    referrer: Option[String],\n",
    "    host: String,\n",
    "    url: String,\n",
    "    event_time: String\n",
    ")\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "25c3320b-03ab-4969-88a7-f7771534d70c",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "dummyData: List[Event] = List(Event(Some(1),Some(2),Some(linkedin),eczachly.com,/signup,2023-01-01), Event(Some(3),Some(7),Some(twitter),eczachly.com,/signup,2023-01-01))\n"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "val dummyData = List(\n",
    "        Event(user_id=Some(1), device_id=Some(2), referrer=Some(\"linkedin\"), host=\"eczachly.com\", url=\"/signup\", event_time=\"2023-01-01\"),\n",
    "        Event(user_id=Some(3), device_id=Some(7), referrer=Some(\"twitter\"), host=\"eczachly.com\", url=\"/signup\", event_time=\"2023-01-01\")\n",
    "    )\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "f7190f95-33b1-4779-8f0d-40e9046e8157",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined class Device\n",
       "defined class EventWithDeviceInfo\n"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "//TODO Illustrate how this fails if you change from Option[Long] to Long\n",
    "case class Device (\n",
    "    device_id: Integer,\n",
    "    browser_type: String,\n",
    "    os_type: String,\n",
    "    device_type: String\n",
    ")\n",
    "\n",
    "case class EventWithDeviceInfo (\n",
    "   user_id: Integer,\n",
    "    device_id: Integer,\n",
    "    browser_type: String,\n",
    "    os_type: String,\n",
    "    device_type: String,\n",
    "    referrer: String,\n",
    "    host: String,\n",
    "    url: String,\n",
    "    event_time: String\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "d045ae06-0f21-42f1-a271-f455e95b0879",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "import org.apache.spark.sql.Dataset\n",
       "import sparkSession.implicits._\n",
       "events: org.apache.spark.sql.Dataset[Event] = [user_id: int, device_id: int ... 4 more fields]\n",
       "devices: org.apache.spark.sql.Dataset[Device] = [device_id: int, browser_type: string ... 2 more fields]\n",
       "filteredViaDataset: org.apache.spark.sql.Dataset[Event] = [user_id: int, device_id: int ... 4 more fields]\n",
       "filteredViaDataFrame: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: int, device_id: int ... 4 more fields]\n",
       "filteredViaSparkSql: org.apache.spark.sql.DataFrame = [user_id: int, device_id: int ... 4 more fields]\n"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "import org.apache.spark.sql.Dataset\n",
    "\n",
    "import sparkSession.implicits._\n",
    "\n",
    "// Applying this case class before hand is very powerful, enforces Nullability/non-nullability at runtime!\n",
    "val events: Dataset[Event] = sparkSession.read.option(\"header\", \"true\")\n",
    "                        .option(\"inferSchema\", \"true\")\n",
    "                        .csv(\"/home/iceberg/data/events.csv\")\n",
    "                        .as[Event]\n",
    "\n",
    "val devices: Dataset[Device] = sparkSession.read.option(\"header\", \"true\")\n",
    "                        .option(\"inferSchema\", \"true\")\n",
    "                        .csv(\"/home/iceberg/data/devices.csv\")\n",
    "                        .as[Device]\n",
    "\n",
    "devices.createOrReplaceTempView(\"devices\")\n",
    "events.createOrReplaceTempView(\"events\")\n",
    "\n",
    "// For simple transformations, you can see that these approaches are very similar. Dataset is winning slightly because of the quality enforcement\n",
    "val filteredViaDataset = events.filter(event => event.user_id.isDefined && event.device_id.isDefined)\n",
    "val filteredViaDataFrame = events.toDF().where($\"user_id\".isNotNull && $\"device_id\".isNotNull)\n",
    "val filteredViaSparkSql = sparkSession.sql(\"SELECT * FROM events WHERE user_id IS NOT NULL AND device_id IS NOT NULL\")\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "80517409-a26e-4aa3-b383-56b928f0c9d3",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "combinedViaDatasets: org.apache.spark.sql.Dataset[EventWithDeviceInfo] = [user_id: int, device_id: int ... 7 more fields]\n"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "// This will fail if user_id is None\n",
    "val combinedViaDatasets = filteredViaDataset\n",
    "    .joinWith(devices, events(\"device_id\") === devices(\"device_id\"), \"inner\")\n",
    "    .map{ case (event: Event, device: Device) => EventWithDeviceInfo(\n",
    "                  user_id=event.user_id.get,\n",
    "                  device_id=device.device_id,\n",
    "                  browser_type=device.browser_type,\n",
    "                  os_type=device.os_type,\n",
    "                  device_type=device.device_type,\n",
    "                  referrer=event.referrer.getOrElse(\"unknow\"),\n",
    "                  host=event.host,\n",
    "                  url=event.url,\n",
    "                  event_time=event.event_time\n",
    "              ) }\n",
    "  .map { eventWithDevice =>\n",
    "        // Convert browser_type to uppercase while maintaining immutability\n",
    "        eventWithDevice.copy(browser_type = eventWithDevice.browser_type.toUpperCase)\n",
    "    }\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "3ce150e3-e5b7-4ece-8803-8189762625ea",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "EventWithDeviceInfo(1037710827,532630305,OTHER,Other,Other,unknow,www.zachwilson.tech,/,2021-03-08 17:27:24.241)\n",
      "EventWithDeviceInfo(925588856,532630305,OTHER,Other,Other,unknow,www.eczachly.com,/,2021-05-10 11:26:21.247)\n",
      "EventWithDeviceInfo(-1180485268,532630305,OTHER,Other,Other,unknow,admin.zachwilson.tech,/,2021-02-17 16:19:30.738)\n",
      "EventWithDeviceInfo(-1044833855,532630305,OTHER,Other,Other,unknow,www.zachwilson.tech,/,2021-09-24 15:53:14.466)\n",
      "EventWithDeviceInfo(747494706,532630305,OTHER,Other,Other,unknow,www.zachwilson.tech,/,2021-09-26 16:03:17.535)\n",
      "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n",
      "|    user_id|device_id|browser_type|os_type|device_type|referrer|                host|url|          event_time|\n",
      "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n",
      "| 1037710827|532630305|       Other|  Other|      Other|    NULL| www.zachwilson.tech|  /|2021-03-08 17:27:...|\n",
      "|  925588856|532630305|       Other|  Other|      Other|    NULL|    www.eczachly.com|  /|2021-05-10 11:26:...|\n",
      "|-1180485268|532630305|       Other|  Other|      Other|    NULL|admin.zachwilson....|  /|2021-02-17 16:19:...|\n",
      "|-1044833855|532630305|       Other|  Other|      Other|    NULL| www.zachwilson.tech|  /|2021-09-24 15:53:...|\n",
      "|  747494706|532630305|       Other|  Other|      Other|    NULL| www.zachwilson.tech|  /|2021-09-26 16:03:...|\n",
      "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n",
      "only showing top 5 rows\n",
      "\n",
      "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n",
      "|    user_id|device_id|browser_type|os_type|device_type|referrer|                host|url|          event_time|\n",
      "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n",
      "| 1037710827|532630305|       Other|  Other|      Other|    NULL| www.zachwilson.tech|  /|2021-03-08 17:27:...|\n",
      "|  925588856|532630305|       Other|  Other|      Other|    NULL|    www.eczachly.com|  /|2021-05-10 11:26:...|\n",
      "|-1180485268|532630305|       Other|  Other|      Other|    NULL|admin.zachwilson....|  /|2021-02-17 16:19:...|\n",
      "|-1044833855|532630305|       Other|  Other|      Other|    NULL| www.zachwilson.tech|  /|2021-09-24 15:53:...|\n",
      "|  747494706|532630305|       Other|  Other|      Other|    NULL| www.zachwilson.tech|  /|2021-09-26 16:03:...|\n",
      "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "combinedViaDataFrames: org.apache.spark.sql.DataFrame = [user_id: int, device_id: int ... 7 more fields]\n",
       "combinedViaSparkSQL: org.apache.spark.sql.DataFrame = [user_id: int, device_id: int ... 7 more fields]\n",
       "rows: Array[EventWithDeviceInfo] = Array(EventWithDeviceInfo(1037710827,532630305,OTHER,Other,Other,unknow,www.zachwilson.tech,/,2021-03-08 17:27:24.241), EventWithDeviceInfo(925588856,532630305,OTHER,Other,Other,unknow,www.eczachly.com,/,2021-05-10 11:26:21.247), EventWithDeviceInfo(-1180485268,532630305,OTHER,Other,Other,unknow,admin.zachwilson.tech,/,2021-02-17 16:19:30.738), EventWithDeviceInfo(-1044833855,532630305,OTHER,Other,Other,unknow,www.zachwilson.tech,/,2021-09-24 15:53:14.466), EventWithDeviceInfo(747494706,532630305,OTHER,Other,Other,unknow,www.zachwilson.tech,/,2021-...\n"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "// DataFrames give up some of the intellisense because you no longer have static typing\n",
    "val combinedViaDataFrames = filteredViaDataFrame.as(\"e\")\n",
    "            //Make sure to use triple equals when using data frames\n",
    "            .join(devices.as(\"d\"), $\"e.device_id\" === $\"d.device_id\", \"inner\")\n",
    "            .select(\n",
    "              $\"e.user_id\",\n",
    "              $\"d.device_id\",\n",
    "              $\"d.browser_type\",\n",
    "              $\"d.os_type\",\n",
    "              $\"d.device_type\",\n",
    "              $\"e.referrer\",\n",
    "              $\"e.host\",\n",
    "              $\"e.url\",\n",
    "              $\"e.event_time\"\n",
    "            )\n",
    "\n",
    "//Creating temp views is a good strategy if you're leveraging SparkSQL\n",
    "filteredViaSparkSql.createOrReplaceTempView(\"filtered_events\")\n",
    "val combinedViaSparkSQL = spark.sql(f\"\"\"\n",
    "    SELECT \n",
    "        fe.user_id,\n",
    "        d.device_id,\n",
    "        d.browser_type,\n",
    "        d.os_type,\n",
    "        d.device_type,\n",
    "        fe. referrer,\n",
    "        fe.host,\n",
    "        fe.url,\n",
    "        fe.event_time\n",
    "    FROM filtered_events fe \n",
    "    JOIN devices d ON fe.device_id = d.device_id\n",
    "\"\"\")\n",
    "\n",
    "\n",
    "val rows= combinedViaDatasets.take(5)\n",
    "rows.foreach(println)\n",
    "combinedViaDataFrames.show(5)\n",
    "combinedViaSparkSQL.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1db45765-b3d1-4059-92f1-e0414f502eb7",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "spylon-kernel",
   "language": "scala",
   "name": "spylon-kernel"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "help_links": [
    {
     "text": "MetaKernel Magics",
     "url": "https://metakernel.readthedocs.io/en/latest/source/README.html"
    }
   ],
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "0.4.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
